package org.laofeng.nifi.processors;


import com.baidu.aip.ocr.AipOcr;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

//@SideEffectFree
@PrimaryNodeOnly
@Tags({"ocr", "image","picture","jpg", "jpeg", "png", "bmp", "wbmp", "gif"})

//@EventDriven
//@SupportsBatching
//@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("使用百度大脑API，解析图片中的文字。注意这里并没有验证输入FlowFile的Content，如果FlowFile的Content不是图片文件，会解析失败！")
@SideEffectFree

public class OcrAipProcessor extends AbstractProcessor {

    public static final PropertyDescriptor app_id_pd = new PropertyDescriptor.Builder()
            .name("app_id")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor api_key_pd = new PropertyDescriptor.Builder()
            .name("api_key")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor secret_key_pd = new PropertyDescriptor.Builder()
            .name("secret_key_pd")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    String app_id;

    String api_key;

    String secret_key;

    AipOcr client = null;


    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .description("Succes relationship")
            .build();


    public static final Relationship FAILURE = new Relationship.Builder()
            .name("FAILURE")
            .description("failure relationship")
            .build();

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.app_id = context.getProperty(app_id_pd).getValue();
        this.api_key = context.getProperty(api_key_pd).getValue();
        this.secret_key = context.getProperty(secret_key_pd).getValue();

        if (this.client == null){
            this.client = new AipOcr(app_id, api_key, secret_key);
            client.setConnectionTimeoutInMillis(2000);
            client.setSocketTimeoutInMillis(60000);
        }

    }


    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public void init(final ProcessorInitializationContext context){
        List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(app_id_pd);
        properties.add(api_key_pd);
        properties.add(secret_key_pd);
        this.properties = Collections.unmodifiableList(properties);

        Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS);
        relationships.add(FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }


    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile ff = session.get();
        if (ff == null){
            return;
        }

        final AtomicReference<String> value = new AtomicReference<>();
        session.read(ff, new InputStreamCallback() {
            public void process(InputStream in) throws IOException {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                IOUtils.copy(in,out);
                JSONObject jj = client.basicAccurateGeneral(out.toByteArray(), new HashMap<String,String>());

                value.set(export_text(jj));
            }
        });

        String words = value.get();
        if(words != null && !words.isEmpty()){
            FlowFile new_ff = session.write(ff, new OutputStreamCallback() {
                public void process(OutputStream out) throws IOException {
                    out.write(value.get().getBytes());
                }
            });
            String file_name_key = "filename";
            String file_name = ff.getAttribute(file_name_key);
            file_name = file_name+".txt";
            session.putAttribute(new_ff,file_name_key,file_name);
            session.getProvenanceReporter().create(new_ff,"export words from images");
            session.transfer(new_ff, SUCCESS);
        } else {
            session.transfer(ff, FAILURE);
        }
    }
    // 由返回的json对象中导出文本
    private String export_text(JSONObject jo) {

        JSONArray words_result = jo.getJSONArray("words_result");
        //JSONArray words = words_result.getJSONArray("words");
        int size = words_result.length();
        StringBuilder buff = new StringBuilder();
        for (int i = 0; i < size; i++){
            JSONObject word = words_result.getJSONObject(i);
            buff.append(word.getString("words")).append("\n");
        }
        buff.deleteCharAt(buff.length() - 1);

        return buff.toString();
    }
}
